package cn.gupao.udfs;

import cn.gupao.service.QueryService;
import cn.gupao.utils.StateDescriptorUtils;
import cn.gupao.pojo.*;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.LinkedMap;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;

import java.util.Map;

/**
 * 按照动态的key进行keyBy
 */
public class RulesMatchFunctionV4 extends KeyedBroadcastProcessFunction<String, DynamicKeyedBean, RulesBean, MatchResult> implements CheckpointedFunction {


    private LinkedMap<Long, Tuple2<RuleCondition, KieSession>> id2RuleConditionAndKieSession;

    private QueryService queryService;

    private transient ListState<EventStateBean> eventsState;


    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        id2RuleConditionAndKieSession = new LinkedMap<>();
        BroadcastState<Long, RulesBean> broadcastState = context.getOperatorStateStore().getBroadcastState(StateDescriptorUtils.rulesStateDescriptor);
        for (Map.Entry<Long, RulesBean> entry : broadcastState.entries()) {
            Long id = entry.getKey();
            RulesBean rulesBean = entry.getValue();
            String json = rulesBean.getRule_condition_json();
            RuleCondition ruleCondition = JSON.parseObject(json, RuleCondition.class);
            //创建kieSession
            KieSession kieSession = createKieSession(rulesBean.getRule_controller_drl());
            //将kieSession和ruleCondition添加到LinkedMap中
            id2RuleConditionAndKieSession.put(id, Tuple2.of(ruleCondition, kieSession));
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化ListState（缓存用户最近一段时间的行为是）
        //获取客户端发送的全局参数(通过运行时上下文，获取客户端发送的全局参数)
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

        //定义状态描述器
        ListStateDescriptor<EventStateBean> eventStateDescriptor = new ListStateDescriptor<>("event-state", EventStateBean.class);
        //设置TTL
        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(2)).build();
        eventStateDescriptor.enableTimeToLive(stateTtlConfig);
        //初始化或恢复状态
        eventsState = getRuntimeContext().getListState(eventStateDescriptor);

        //定义一个状态描述器，用来保存时间触发条件
        // MapStateDescriptor<Long, TimerCondition> timerConditionStateDescriptor = new MapStateDescriptor<>("timer-condition-state", Long.class, TimerCondition.class);
         //timerConditionMapState = getRuntimeContext().getMapState(timerConditionStateDescriptor);


        queryService = new QueryService();
        //将参数传入到init方法中
        queryService.init(parameterTool, eventsState);

    }

    //处理用户行为数据
    @Override
    public void processElement(DynamicKeyedBean bean, ReadOnlyContext ctx, Collector<MatchResult> out) throws Exception {

        LogBean logBean = bean.getLogBean();
        eventsState.add(logBean.toEventStateBean());

        for (Map.Entry<Long, Tuple2<RuleCondition, KieSession>> entry : id2RuleConditionAndKieSession.entrySet()) {
            Long ruleId = entry.getKey();
            Tuple2<RuleCondition, KieSession> tp = entry.getValue();
            RuleCondition ruleCondition = tp.f0;  // 规则条件
            KieSession kieSession = tp.f1;        // 条件的逻辑判断
            //将事实数字和规则条件insert到规则引擎
            FactBean factBean = new FactBean(logBean, ruleCondition, queryService, false);
            kieSession.insert(factBean);
            //应用drl中的规则
            kieSession.fireAllRules();

            if (factBean.isMatch()) {
                //输出结果
                out.collect(new MatchResult(logBean.getDeviceId(), logBean.getTimeStamp(), System.currentTimeMillis(), ruleId));
            }

        }


    }


    //处理广播规则数据
    @Override
    public void processBroadcastElement(RulesBean rulesBean, Context ctx, Collector<MatchResult> out) throws Exception {

        int status = rulesBean.getRule_status();

        BroadcastState<Long, RulesBean> broadcastState = ctx.getBroadcastState(StateDescriptorUtils.rulesStateDescriptor);

        //有效的规则
        if (status == 1) {
            //新增规则、修改规则(将规则保存到广播状态中)
            broadcastState.put(rulesBean.getId(), rulesBean);
            String json = rulesBean.getRule_condition_json();
            RuleCondition ruleCondition = JSON.parseObject(json, RuleCondition.class);
            //创建kieSession
            KieSession kieSession = createKieSession(rulesBean.getRule_controller_drl());

            //将kieSession保存到一个普通的成员变量中，这个成员变量为LinkedHashMap
            id2RuleConditionAndKieSession.put(rulesBean.getId(), Tuple2.of(ruleCondition, kieSession));
        } else {
            //禁用的规则
            //将广播状态中的ruleBean移除
            broadcastState.remove(rulesBean.getId());
            //移除对应的KieSession
            id2RuleConditionAndKieSession.remove(rulesBean.getId());
        }


    }


    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {

    }

    private KieSession createKieSession(String drl) {
        KieHelper kieHelper = new KieHelper();
        kieHelper.addContent(drl, ResourceType.DRL);
        return kieHelper.build().newKieSession();
    }
}
